{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Benchmark GPU vs CPU\n",
    "---\n",
    "\n",
    "This notebook can be used to benchmark performance using CPU, a single GPU or many GPUs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>1102</td><td>application_1549714785003_1348</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hadoop33:8088/proxy/application_1549714785003_1348/\">Link</a></td><td><a target=\"_blank\" href=\"http://gpu1:8042/node/containerlogs/container_e70_1549714785003_1348_01_000001/dwadaw__robin_er\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "def wrapper():\n",
    "    import tensorflow as tf\n",
    "    \n",
    "    # Wrapper for keras_applications, you can import any model you want to try (like ResNet50)\n",
    "    from tensorflow.keras.applications import VGG19\n",
    "\n",
    "    import numpy as np\n",
    "    \n",
    "    from hops import tensorboard\n",
    "    \n",
    "    # Utility module for getting number of GPUs accessible by the container (Spark Executor)\n",
    "    from hops import devices\n",
    "\n",
    "    batch_size = 32 # Number of samples to process on each GPU every iteration\n",
    "    \n",
    "    # Image dimensions\n",
    "    height = 224\n",
    "    width = 224\n",
    "    channels = 3\n",
    "    num_classes = 1000\n",
    "    \n",
    "    num_iterations = 5000 # Number of iterations, increase to run longer\n",
    "    \n",
    "    model = VGG19(weights=None, input_shape=(height, width, channels), classes=num_classes)\n",
    "        \n",
    "    optimizer = tf.train.RMSPropOptimizer(0.2)\n",
    "    model.compile(loss='categorical_crossentropy', optimizer=optimizer)\n",
    "    \n",
    "    # Read synthetic data (can be replaced with real data)\n",
    "    def input_fn():\n",
    "      data = np.random.random((batch_size, height, width, channels))\n",
    "      labels = np.random.random((batch_size, num_classes))\n",
    "      x = tf.cast(data, tf.float32)\n",
    "      dataset = tf.data.Dataset.from_tensor_slices((data, labels))\n",
    "      dataset = dataset.repeat(num_iterations)\n",
    "      dataset = dataset.batch(batch_size)\n",
    "      return dataset    \n",
    "    \n",
    "    tf.keras.backend.set_learning_phase(True)\n",
    "\n",
    "    # Define DistributionStrategies and convert the Keras Model to an\n",
    "    # Estimator that utilizes these DistributionStrateges.\n",
    "    # Evaluator is a single worker, so using MirroredStrategy.\n",
    "    # Training is automatically distributed on all available GPUs when using MirroredStrategy\n",
    "    run_config = tf.estimator.RunConfig(\n",
    "            train_distribute=tf.contrib.distribute.MirroredStrategy())\n",
    "    keras_estimator = tf.keras.estimator.model_to_estimator(keras_model=model, \n",
    "               config=run_config, model_dir=tensorboard.logdir())\n",
    "\n",
    "    # Train and evaluate the model. Evaluation will be skipped if there is not an\n",
    "    # \"evaluator\" job in the cluster.\n",
    "    tf.estimator.train_and_evaluate(keras_estimator, train_spec=tf.estimator.TrainSpec(input_fn=input_fn),\n",
    "        eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))\n",
    "        "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import experiment\n",
    "experiment.launch(wrapper, local_logdir=True)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}